use std::ops::Bound;

use diesel::{
    pg::types::sql_types,
    prelude::*,
    sql_query,
    sql_types::{Binary, Integer, Jsonb, Nullable},
    PgConnection, QueryDsl, RunQueryDsl,
};

use graph::{
    anyhow::Context,
    components::store::StoredDynamicDataSource,
    constraint_violation,
    data_source::CausalityRegion,
    prelude::{serde_json, BlockNumber, StoreError},
};

use crate::primary::Namespace;

type DynTable = diesel_dynamic_schema::Table<String, Namespace>;
type DynColumn<ST> = diesel_dynamic_schema::Column<DynTable, &'static str, ST>;

#[derive(Debug)]
pub(crate) struct DataSourcesTable {
    namespace: Namespace,
    qname: String,
    table: DynTable,
    vid: DynColumn<Integer>,
    block_range: DynColumn<sql_types::Range<Integer>>,
    causality_region: DynColumn<Integer>,
    manifest_idx: DynColumn<Integer>,
    param: DynColumn<Nullable<Binary>>,
    context: DynColumn<Nullable<Jsonb>>,
    done_at: DynColumn<Nullable<Integer>>,
}

impl DataSourcesTable {
    const TABLE_NAME: &'static str = "data_sources$";

    pub(crate) fn new(namespace: Namespace) -> Self {
        let table =
            diesel_dynamic_schema::schema(namespace.clone()).table(Self::TABLE_NAME.to_string());

        DataSourcesTable {
            qname: format!("{}.{}", namespace, Self::TABLE_NAME),
            namespace,
            vid: table.column("vid"),
            block_range: table.column("block_range"),
            causality_region: table.column("causality_region"),
            manifest_idx: table.column("manifest_idx"),
            param: table.column("param"),
            context: table.column("context"),
            done_at: table.column("done_at"),
            table,
        }
    }

    pub(crate) fn as_ddl(&self) -> String {
        format!(
            "
            create table {nsp}.{table} (
                vid integer generated by default as identity primary key,
                block_range int4range not null,
                causality_region integer not null,
                manifest_idx integer not null,
                parent integer references {nsp}.{table},
                id bytea,
                param bytea,
                context jsonb,
                done_at int
            );

            create index gist_block_range_data_sources$ on {nsp}.data_sources$ using gist (block_range);
            create index btree_causality_region_data_sources$ on {nsp}.data_sources$ (causality_region);
            ",
            nsp = self.namespace.to_string(),
            table = Self::TABLE_NAME
        )
    }

    // Query to load the data sources which are live at `block`. Ordering by the creation block and
    // `vid` makes sure they are in insertion order which is important for the correctness of
    // reverts and the execution order of triggers. See also 8f1bca33-d3b7-4035-affc-fd6161a12448.
    pub(super) fn load(
        &self,
        conn: &PgConnection,
        block: BlockNumber,
    ) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
        type Tuple = (
            (Bound<i32>, Bound<i32>),
            i32,
            Option<Vec<u8>>,
            Option<serde_json::Value>,
            CausalityRegion,
            Option<i32>,
        );
        let tuples = self
            .table
            .clone()
            .filter(diesel::dsl::sql("block_range @> ").bind::<Integer, _>(block))
            .select((
                &self.block_range,
                &self.manifest_idx,
                &self.param,
                &self.context,
                &self.causality_region,
                &self.done_at,
            ))
            .order_by(&self.vid)
            .load::<Tuple>(conn)?;

        let mut dses: Vec<_> = tuples
            .into_iter()
            .map(
                |(block_range, manifest_idx, param, context, causality_region, done_at)| {
                    let creation_block = match block_range.0 {
                        Bound::Included(block) => Some(block),

                        // Should never happen.
                        Bound::Excluded(_) | Bound::Unbounded => {
                            unreachable!("dds with open creation")
                        }
                    };

                    StoredDynamicDataSource {
                        manifest_idx: manifest_idx as u32,
                        param: param.map(|p| p.into()),
                        context,
                        creation_block,
                        done_at,
                        causality_region,
                    }
                },
            )
            .collect();

        // This sort is stable and `tuples` was ordered by vid, so `dses` will be ordered by `(creation_block, vid)`.
        dses.sort_by_key(|v| v.creation_block);

        Ok(dses)
    }

    pub(crate) fn insert(
        &self,
        conn: &PgConnection,
        data_sources: &[StoredDynamicDataSource],
        block: BlockNumber,
    ) -> Result<usize, StoreError> {
        let mut inserted_total = 0;

        for ds in data_sources {
            let StoredDynamicDataSource {
                manifest_idx,
                param,
                context,
                creation_block,
                done_at,
                causality_region,
            } = ds;

            if creation_block != &Some(block) {
                return Err(constraint_violation!(
                    "mismatching creation blocks `{:?}` and `{}`",
                    creation_block,
                    block
                ));
            }

            // Offchain data sources have a unique causality region assigned from a sequence in the
            // database, while onchain data sources always have causality region 0.
            let query = format!(
                "insert into {}(block_range, manifest_idx, param, context, causality_region, done_at) \
                            values (int4range($1, null), $2, $3, $4, $5, $6)",
                self.qname
            );

            let query = sql_query(query)
                .bind::<Nullable<Integer>, _>(creation_block)
                .bind::<Integer, _>(*manifest_idx as i32)
                .bind::<Nullable<Binary>, _>(param.as_ref().map(|p| &**p))
                .bind::<Nullable<Jsonb>, _>(context)
                .bind::<Integer, _>(causality_region)
                .bind::<Nullable<Integer>, _>(done_at);

            inserted_total += query.execute(conn)?;
        }

        Ok(inserted_total)
    }

    pub(crate) fn revert(&self, conn: &PgConnection, block: BlockNumber) -> Result<(), StoreError> {
        // Use `@>` to leverage the gist index.
        // This assumes all ranges are of the form [x, +inf).
        let query = format!(
            "delete from {} where block_range @> $1 and lower(block_range) = $1",
            self.qname
        );
        sql_query(query).bind::<Integer, _>(block).execute(conn)?;
        Ok(())
    }

    /// Copy the dynamic data sources from `self` to `dst`. All data sources that
    /// were created up to and including `target_block` will be copied.
    pub(crate) fn copy_to(
        &self,
        conn: &PgConnection,
        dst: &DataSourcesTable,
        target_block: BlockNumber,
        src_manifest_idx_and_name: &[(i32, String)],
        dst_manifest_idx_and_name: &[(i32, String)],
    ) -> Result<usize, StoreError> {
        // Check if there are any data sources for dst which indicates we already copied
        let count = dst.table.clone().count().get_result::<i64>(conn)?;
        if count > 0 {
            return Ok(count as usize);
        }

        type Tuple = (
            (Bound<i32>, Bound<i32>),
            i32,
            Option<Vec<u8>>,
            Option<serde_json::Value>,
            i32,
            Option<i32>,
        );

        let src_tuples = self
            .table
            .clone()
            .filter(diesel::dsl::sql("lower(block_range) <= ").bind::<Integer, _>(target_block))
            .select((
                &self.block_range,
                &self.manifest_idx,
                &self.param,
                &self.context,
                &self.causality_region,
                &self.done_at,
            ))
            .order_by(&self.vid)
            .load::<Tuple>(conn)?;

        let mut count = 0;
        for (block_range, src_manifest_idx, param, context, causality_region, done_at) in src_tuples
        {
            let name = &src_manifest_idx_and_name
                .iter()
                .find(|(idx, _)| idx == &src_manifest_idx)
                .context("manifest_idx not found in src")?
                .1;
            let dst_manifest_idx = dst_manifest_idx_and_name
                .iter()
                .find(|(_, n)| n == name)
                .context("name not found in dst")?
                .0;

            let query = format!(
                "\
             insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at)
             values(case
                 when upper($2) <= $1 then $2
                 else int4range(lower($2), null)
             end,
             $3, $4, $5, $6, $7)
             ",
                dst = dst.qname
            );

            count += sql_query(&query)
                .bind::<Integer, _>(target_block)
                .bind::<sql_types::Range<Integer>, _>(block_range)
                .bind::<Integer, _>(dst_manifest_idx)
                .bind::<Nullable<Binary>, _>(param)
                .bind::<Nullable<Jsonb>, _>(context)
                .bind::<Integer, _>(causality_region)
                .bind::<Nullable<Integer>, _>(done_at)
                .execute(conn)?;
        }

        // If the manifest idxes remained constant, we can test that both tables have the same
        // contents.
        if src_manifest_idx_and_name == dst_manifest_idx_and_name {
            debug_assert!(
                self.load(conn, target_block).map_err(|e| e.to_string())
                    == dst.load(conn, target_block).map_err(|e| e.to_string())
            );
        }

        Ok(count)
    }

    // Remove offchain data sources by checking the causality region, which currently uniquely
    // identifies an offchain data source.
    pub(super) fn update_offchain_status(
        &self,
        conn: &PgConnection,
        data_sources: &[StoredDynamicDataSource],
    ) -> Result<(), StoreError> {
        for ds in data_sources {
            let query = format!(
                "update {} set done_at = $1 where causality_region = $2",
                self.qname
            );

            let count = sql_query(query)
                .bind::<Nullable<Integer>, _>(ds.done_at)
                .bind::<Integer, _>(ds.causality_region)
                .execute(conn)?;

            if count > 1 {
                return Err(constraint_violation!(
                    "expected to remove at most one offchain data source but would remove {}, causality region: {}",
                    count,
                    ds.causality_region
                ));
            }
        }

        Ok(())
    }

    /// The current causality sequence according to the store, which is infered to be the maximum
    /// value existing in the table.
    pub(super) fn causality_region_curr_val(
        &self,
        conn: &PgConnection,
    ) -> Result<Option<CausalityRegion>, StoreError> {
        // Get the maximum `causality_region` leveraging the btree index.
        Ok(self
            .table
            .clone()
            .select(&self.causality_region)
            .order_by((&self.causality_region).desc())
            .first::<CausalityRegion>(conn)
            .optional()?)
    }
}
